# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # This code sample demonstrates using the low-level generated client for Python. # Make sure you provide your Project, Dataset, and Table details below in line 167. import datetime import decimal from google.cloud import bigquery_storage_v1 from google.cloud.bigquery_storage_v1 import types from google.cloud.bigquery_storage_v1 import writer from google.protobuf import descriptor_pb2 ################################################################################## # To update your protocol buffer definition with your sample_data.proto file run: # # protoc --python_out=. sample_data.proto # ##################################################################################   import sample_data_pb2 def append_rows_proto2(project_id: str, dataset_id: str, table_id: str): # Create a write stream, write some sample data, and commit the stream write_client = bigquery_storage_v1.BigQueryWriteClient() parent = write_client.table_path(project_id, dataset_id, table_id) write_stream = types.WriteStream() # When creating the stream, choose the type. Use the PENDING type to wait # until the stream is committed before it is visible. See: # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type write_stream.type_ = types.WriteStream.Type.PENDING write_stream = write_client.create_write_stream( parent=parent, write_stream=write_stream ) stream_name = write_stream.name # Create a template with fields needed for the first request. request_template = types.AppendRowsRequest() # The initial request must contain the stream name. request_template.write_stream = stream_name # So that BigQuery knows how to parse the serialized_rows, generate a # protocol buffer representation of your message descriptor. proto_schema = types.ProtoSchema() proto_descriptor = descriptor_pb2.DescriptorProto() sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor) proto_schema.proto_descriptor = proto_descriptor proto_data = types.AppendRowsRequest.ProtoData() proto_data.writer_schema = proto_schema request_template.proto_rows = proto_data # Some stream types support an unbounded number of requests. Construct an # AppendRowsStream to send an arbitrary number of requests to a stream. append_rows_stream = writer.AppendRowsStream(write_client, request_template) # Create a batch of row data by appending proto2 serialized bytes to the # serialized_rows repeated field. proto_rows = types.ProtoRows() row = sample_data_pb2.SampleData() row.creationTime = "2020-10-22 21:12:44 UTC" row.id = "fZTLIbppjFXOmy" row.name = "User Login" row.severity = "info" row.source = "Login Server" row.destination = "www.acmelogin.com" row.accessGranted = True row.labels = '{"property":"Authentication Success","threatRating":0.1,"type":"login","method":"bqStorageAPI"}' proto_rows.serialized_rows.append(row.SerializeToString()) row = sample_data_pb2.SampleData() row.creationTime = "2020-10-22 21:28:14 UTC" row.id = "aOShYdrZEOdUfC" row.name = "User Login" row.severity = "warning" row.accessGranted = False row.labels = '{"property":"Authentication Success","threatRating":null,"method":"bqStorageAPI"}' proto_rows.serialized_rows.append(row.SerializeToString()) # Set an offset to allow resuming this stream if the connection breaks. # Keep track of which requests the server has acknowledged and resume the # stream at the first non-acknowledged message. If the server has already # processed a message with that offset, it will return an ALREADY_EXISTS # error, which can be safely ignored. # # The first request must always have an offset of 0. request = types.AppendRowsRequest() request.offset = 0 proto_data = types.AppendRowsRequest.ProtoData() proto_data.rows = proto_rows request.proto_rows = proto_data response_future_1 = append_rows_stream.send(request) # Second batch proto_rows = types.ProtoRows() row = sample_data_pb2.SampleData() row.creationTime = "2020-10-22 21:57:14 UTC" row.id = "VNxAfWbpdPimKG" row.name = "User Logout" row.severity = "info" row.source = "Login Server" row.destination = "www.acmelogin.com" row.accessGranted = True row.labels = '{"property":"Authentication Success","threatRating":0.1,"type":"log out","method":"bqStorageAPI"}' proto_rows.serialized_rows.append(row.SerializeToString()) row = sample_data_pb2.SampleData() row.creationTime = "2020-10-22 21:24:14 UTC" row.id = "vZWRZbgZcCEalB" row.name = "User Login" row.severity = "critical" row.accessGranted = False row.labels = '{"property":"Authentication Success","threatRating":0.5,"type":"login failure","c":[10, 20,"test"],"method":"bqStorageAPI"}' proto_rows.serialized_rows.append(row.SerializeToString()) # Since this is the second request, you only need to include the row data. # The name of the stream and protocol buffers DESCRIPTOR is only needed in # the first request. request = types.AppendRowsRequest() proto_data = types.AppendRowsRequest.ProtoData() proto_data.rows = proto_rows request.proto_rows = proto_data # Offset must equal the number of rows that were previously sent. request.offset = 2 response_future_2 = append_rows_stream.send(request) # All three requests are in-flight, wait for them to finish being processed # before finalizing the stream. print(response_future_1.result()) print(response_future_2.result()) # Shutdown background threads and close the streaming connection. append_rows_stream.close() # A PENDING type stream must be "finalized" before being committed. No new # records can be written to the stream after this method has been called. write_client.finalize_write_stream(name=write_stream.name) # Commit the stream you created earlier. batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest() batch_commit_write_streams_request.parent = parent batch_commit_write_streams_request.write_streams = [write_stream.name] write_client.batch_commit_write_streams(batch_commit_write_streams_request) print(f"Writes to stream: '{write_stream.name}' have been committed.") if __name__ == '__main__': append_rows_proto2('', 'json_example', 'streaming_events') # Change this to your specific BigQuery project, dataset, table details